扩展RDD API三部曲之第二部自定义操作算子
扩展RDD API三部曲,主要是帮助大家掌握如下三个内容:
1). 回顾一下RDD的基础
2). 扩展Action,也即是自定义RDD算子
3). 扩展 transform及自定义RDD
本文主要是将自定义Spark RDD算子中的Action 类型操作。
1. 准备阶段
讲到自定义RDD的action操作,大家首先应该想到的就是那些RDD到key-value算子的隐式转换,具体一点也就是PairRDDFunctions这个类里包含的算子,比如reducebykey等操作算子。
具体实现肯定是要比较了解scala的隐式转换操作,这个浪尖也发过文章了,可以点击下文阅读:
首先,我们要进行准备操作,首先定义一个case class
class SalesRecord(val transactionId: String,
val customerId: String,
val itemId: String,
val itemValue: Double) extends Comparable[SalesRecord]
with Serializable {
override def compareTo(o: SalesRecord): Int = {
return this.transactionId.compareTo(o.transactionId)
}
override def toString: String = {
transactionId+","+customerId+","+itemId+","+itemValue
}
}
然后,定义我们的主要函数:
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
.set("yarn.resourcemanager.hostname", "mt-mdh.local")
.set("spark.executor.instances","2")
.set("spark.default.parallelism","4")
.set("spark.sql.shuffle.partitions","4")
.setJars(List("/opt/sparkjar/bigdata.jar"
,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
,"/opt/jars/kafka-clients-0.10.2.2.jar"
,"/opt/jars/kafka_2.11-0.10.2.2.jar"))
val sc = new SparkContext(sparkConf)
val dataRDD = sc.textFile("file:///opt/bigdata/src/main/data/sales.csv")
val salesRecordRDD = dataRDD.map(row => {
val colValues = row.split(",")
new SalesRecord(colValues(0),colValues(1),colValues(2),colValues(3).toDouble)
})
这个时候加入我们需要对itemValue字段求和,常见的做法是
salesRecordRDD.map(_.itemValue).sum
其实,sum就是DoubleRDDFunctions内部的算子,也是通过隐式转换实现的。
2. 自定义算子实现
然后就是要定义RDD的操作算子本身,也即是一个工具类,我们叫他为CustomFunctions,内部包含求和函数如下:
import org.apache.spark.rdd.RDD
class CustomFunctions(rdd:RDD[SalesRecord]) {
def totalSales = rdd.map(_.itemValue).sum
def discount(discountPercentage:Double) = new DiscountRDD(rdd,discountPercentage)
}
这个仔细读一下上面已有的隐式转换算子,可以发现还不行,需要为自定义RDD的操作算子,自定义一个隐士转换的算子工具,内容如下:
object CustomFunctions {
implicit def addCustomFunctions(rdd: RDD[SalesRecord]) = new CustomFunctions(rdd)
}
3. 使用算子
调用我们的转换方法:
import CustomFunctions._
println("Cunstom RDD API : "+salesRecordRDD.totalSales)
输出结果:
这就是自定义RDD的action操作。
下篇文章为自定义RDD和转换操作,这个就只会在星球里分享了欢迎加入浪尖的知识星球,与近420好友一起学习进步。
推荐阅读:
Spark源码系列之Standalone模式下Spark应用的整个启动过程
Spark源码系列之foreach和foreachPartition的区别
Hbase源码系列之BufferedMutator的Demo和源码解析